#!/usr/bin/env python3
# Copyright 2020-2023 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# %% [markdown]
# # Data passing tutorial
# Data passing is the most important aspect of Pipelines.
#
# In Kubeflow Pipelines, the pipeline authors compose pipelines by creating component instances (tasks) and connecting them together.
#
# Components have inputs and outputs. They can consume and produce arbitrary data.
#
# Pipeline authors establish connections between component tasks by connecting their data inputs and outputs - by passing the output of one task as an argument to another task's input.
#
# The system takes care of storing the data produced by components and later passing that data to other components for consumption as instructed by the pipeline.
#
# This tutorial shows how to create python components that produce, consume and transform data.
# It shows how to create data passing pipelines by instantiating components and connecting them together.

# %%

from typing import Dict, List

from kfp import compiler
from kfp import dsl
from kfp.dsl import Input, InputPath, Output, OutputPath, Dataset, Model, component


@component
def preprocess(
    # An input parameter of type string.
    message: str,
    # Use Output[T] to get a metadata-rich handle to the output artifact
    # of type `Dataset`.
    output_dataset_one: Output[Dataset],
    # A locally accessible filepath for another output artifact of type
    # `Dataset`.
    output_dataset_two_path: OutputPath('Dataset'),
    # A locally accessible filepath for an output parameter of type string.
    output_parameter_path: OutputPath(str),
    # A locally accessible filepath for an output parameter of type bool.
    output_bool_parameter_path: OutputPath(bool),
    # A locally accessible filepath for an output parameter of type dict.
    output_dict_parameter_path: OutputPath(Dict[str, int]),
    # A locally accessible filepath for an output parameter of type list.
    output_list_parameter_path: OutputPath(List[str]),
):
    """Dummy preprocessing step."""

    # Use Dataset.path to access a local file path for writing.
    # One can also use Dataset.uri to access the actual URI file path.
    with open(output_dataset_one.path, 'w') as f:
        f.write(message)

    # OutputPath is used to just pass the local file path of the output artifact
    # to the function.
    with open(output_dataset_two_path, 'w') as f:
        f.write(message)

    with open(output_parameter_path, 'w') as f:
        f.write(message)

    with open(output_bool_parameter_path, 'w') as f:
        f.write(
            str(True))  # use either `str()` or `json.dumps()` for bool values.

    import json
    with open(output_dict_parameter_path, 'w') as f:
        f.write(json.dumps({'A': 1, 'B': 2}))

    with open(output_list_parameter_path, 'w') as f:
        f.write(json.dumps(['a', 'b', 'c']))


@component
def train(
    # Use InputPath to get a locally accessible path for the input artifact
    # of type `Dataset`.
    dataset_one_path: InputPath('Dataset'),
    # Use Input[T] to get a metadata-rich handle to the input artifact
    # of type `Dataset`.
    dataset_two: Input[Dataset],
    # An input parameter of type string.
    message: str,
    # Use Output[T] to get a metadata-rich handle to the output artifact
    # of type `Model`.
    model: Output[Model],
    # An input parameter of type bool.
    input_bool: bool,
    # An input parameter of type dict.
    input_dict: Dict[str, int],
    # An input parameter of type List[str].
    input_list: List[str],
    # An input parameter of type int with a default value.
    num_steps: int = 100,
):
    """Dummy Training step."""
    with open(dataset_one_path, 'r') as input_file:
        dataset_one_contents = input_file.read()

    with open(dataset_two.path, 'r') as input_file:
        dataset_two_contents = input_file.read()

    line = (f'dataset_one_contents: {dataset_one_contents} || '
            f'dataset_two_contents: {dataset_two_contents} || '
            f'message: {message} || '
            f'input_bool: {input_bool}, type {type(input_bool)} || '
            f'input_dict: {input_dict}, type {type(input_dict)} || '
            f'input_list: {input_list}, type {type(input_list)} \n')

    with open(model.path, 'w') as output_file:
        for i in range(num_steps):
            output_file.write('Step {}\n{}\n=====\n'.format(i, line))

    # model is an instance of Model artifact, which has a .metadata dictionary
    # to store arbitrary metadata for the output artifact.
    model.metadata['accuracy'] = 0.9


@dsl.pipeline(pipeline_root='', name='tutorial-data-passing')
def data_passing_pipeline(message: str = 'message'):
    preprocess_task = preprocess(message=message)
    train_task = train(
        dataset_one_path=preprocess_task.outputs['output_dataset_one'],
        dataset_two=preprocess_task.outputs['output_dataset_two_path'],
        message=preprocess_task.outputs['output_parameter_path'],
        input_bool=preprocess_task.outputs['output_bool_parameter_path'],
        input_dict=preprocess_task.outputs['output_dict_parameter_path'],
        input_list=preprocess_task.outputs['output_list_parameter_path'],
    )


if __name__ == '__main__':
    compiler.Compiler().compile(data_passing_pipeline, __file__ + '.yaml')
